package sbox

import (
	"fmt"

	"gitee.com/dennis-kk/rpc-go-backend/idlrpc"
	"gitee.com/dennis-kk/rpc-go-backend/idlrpc/pkg/errors"
	"gitee.com/dennis-kk/rpc-go-backend/idlrpc/pkg/protocol"
	"gitee.com/dennis-kk/rpc-go-backend/idlrpc/pkg/transport"
	"gitee.com/dennis-kk/service-box-go/util/config/reader"
	perror "gitee.com/dennis-kk/service-box-go/util/errors"
	"gitee.com/dennis-kk/service-box-go/util/http_proxy"
	"gitee.com/dennis-kk/service-box-go/util/slog"
	"gitee.com/dennis-kk/service-box-go/util/tools"
)

const (
	addInsideChan = iota
	transHttpCallFailed
)

type (
	outsideTransMap    map[protocol.GlobalIndexType]transport.ITransport // outside connect cache
	serviceTransMap    map[uint64]transport.ITransport                   // inside service transport
	outsideRefTransMap map[protocol.GlobalIndexType]serviceTransMap      // outside find service c  ache
	insideCallMap      map[uint32]*callInfo                              // inside call cache

	//callInfo proxy rpc info
	callInfo struct {
		replaceCallId uint32
		originalID    uint32
		inside        transport.ITransport
	}

	proxyTask struct {
		taskType    int
		globalIndex protocol.GlobalIndexType
		serviceUuid uint64
		callId      uint32
		inside      *BoxChannel
	}

	proxyModeCfg struct {
		Address     string `yaml:"host"`
		ConnectSize uint32 `yaml:"cache_size"`
		Binding     bool   `yaml:"binding"` //是否启用绑定模式
	}

	//httpProxy http proxy 扩展，增加uuid
	httpProxy struct {
		*http_proxy.HttpProxy                          //http proxy
		globalIndex           protocol.GlobalIndexType //GlobalIndex 代表外部链接
	}

	ProxyHandler struct {
		gTransMap      outsideTransMap    //外部连接索引
		gSrvTransCache outsideRefTransMap //外部连接绑定服务缓存，用于绑定模式
		gCallInfo      insideCallMap      //调用列表
		cfg            *proxyModeCfg      //配置文件
		owner          *ServiceBox        //box指针
		replaceId      uint32             //call id 用来替换内部调用外部请求使用的id
		httpHandle     *httpProxy         //http协议网关handle
		ch             chan *proxyTask
	}
)

func newProxyHandler(cfg *proxyModeCfg, box *ServiceBox) *ProxyHandler {
	return &ProxyHandler{
		gTransMap:      make(outsideTransMap),
		gSrvTransCache: make(outsideRefTransMap),
		gCallInfo:      make(insideCallMap),
		cfg:            cfg,
		owner:          box,
		replaceId:      1,
		ch:             make(chan *proxyTask, 64),
	}
}

func (p *proxyModeCfg) preProcess() error {
	if addr, err := tools.ParseIp(p.Address); err != nil {
		return err
	} else {
		p.Address = addr
	}
	return nil
}

func (p *ProxyHandler) NewReplaceID() uint32 {
	p.replaceId = p.replaceId + 1
	return p.replaceId
}

func (p *ProxyHandler) initHttpHandle(cfg reader.Value) error {

	p.httpHandle = &httpProxy{
		HttpProxy: http_proxy.MakeHttpProxy(
			func(options *http_proxy.Options) {
				cfg.Scan(options)
			},
			http_proxy.WithCustomLogger(p.owner.logger)),
		globalIndex: protocol.GlobalIndexType(p.owner.idGen.New32UUID()),
	}

	if p.httpHandle.HttpProxy == nil {
		return fmt.Errorf("init http proxy module error")
	}

	p.httpHandle.SetHttpRequestHandle(p.onHttpCall)

	return nil
}

func (p *ProxyHandler) startHttpHandle() error {
	if p.httpHandle == nil {
		// not init http module
		return nil
	}

	return p.httpHandle.Start()
}

func (p *ProxyHandler) shutdownHttpHandle() {
	if p.httpHandle == nil {
		// not init http module
		return
	}

	p.httpHandle.ShutDown()
}

func (p *ProxyHandler) onAccept(name string, trans *BoxChannel) {
	uuid := protocol.GlobalIndexType(p.owner.idGen.New32UUID())
	trans.SetGlobalIndex(uuid)
	p.gTransMap[uuid] = trans
	p.owner.logger.Info("[Proxy] %s outside trans id: %v connect to server", name, uuid)
}

func (p *ProxyHandler) onClose(name string, trans *BoxChannel) {
	if trans.GlobalIndex() != idlrpc.InvalidGlobalIndex {
		delete(p.gSrvTransCache, trans.GlobalIndex())
	}
}

func (p *ProxyHandler) OnRelay(from transport.ITransport, header *protocol.RpcMsgHeader) error {
	var err error
	switch header.Type {
	case protocol.RequestMsg:
		err = p.onRpcCall(from)
	case protocol.ResponseMsg:
		err = p.onRpcReturn(from)
	case protocol.ProxyRequestMsg:
		err = p.onProxyCall(from)
	case protocol.ProxyResponseMsg:
		err = p.onProxyReturn(from)
	default:
		p.owner.logger.Warn("unsupported plato rpc message type from %d:%s ", from.GlobalIndex(), from.RemoteAddr())
		return errors.ErrInvalidProto
	}
	if err != nil {
		slog.Warn("[Proxy] conn %s type %d trans error %v !", from.RemoteAddr(), header.Type, err)
	}
	return err
}

func (p *ProxyHandler) onTick() {
	// tick http 请求
	if p.httpHandle != nil {
		p.httpHandle.Tick()
	}

	// tick 回包请求
	for {
		select {
		case t := <-p.ch:
			p.onTask(t)
		default:
			return
		}
	}
}

func (p *ProxyHandler) onTask(t *proxyTask) {
	switch t.taskType {
	case addInsideChan:
		cache, ok := p.gSrvTransCache[t.globalIndex]
		if !ok {
			cache = make(serviceTransMap)
		}
		cache[t.serviceUuid] = t.inside
		p.gSrvTransCache[t.globalIndex] = cache
	case transHttpCallFailed:
		// 处理http call 失败
		if p.httpHandle == nil {
			p.owner.logger.Warn("no effective http proxy, but received transHttpCallFailed %d:%d ", t.serviceUuid, t.serviceUuid)
			return
		}
		// 构造ret
		ret := &protocol.ProxyRespPackage{
			Header: &protocol.RpcProxyCallRetHeader{
				RpcMsgHeader: protocol.RpcMsgHeader{
					Length: uint32(protocol.ProxyRetHeadSize),
					Type:   protocol.ProxyResponseMsg,
				},
				CallID:      t.callId,
				ErrorCode:   protocol.IDL_SERVICE_NOT_FOUND,
				GlobalIndex: t.globalIndex,
			},
		}
		_ = p.httpHandle.OnRpcResponse(ret)
	}
}

func (p *ProxyHandler) onRpcCall(trans transport.ITransport) error {
	//read trans header
	pkg := make([]byte, protocol.CallHeadSize)
	if mLen, err := trans.Read(pkg[:], protocol.CallHeadSize); mLen != protocol.CallHeadSize || err != nil {
		return errors.ErrIllegalProto
	}
	// read protocol header
	msgHeader := protocol.ReadCallHeader(pkg)
	if msgHeader == nil {
		return errors.ErrIllegalReq
	}

	mLen := int(msgHeader.Length) - protocol.CallHeadSize

	reqMsg := &protocol.RequestPackage{
		Header: msgHeader,
		Buffer: make([]byte, mLen),
	}

	rLen, err := trans.Read(reqMsg.Buffer[:], mLen)
	if err != nil || rLen != mLen {
		return errors.ErrIllegalProto
	}

	return p.outsideCallInside(trans, reqMsg.Header, reqMsg.Buffer)
}

func (p *ProxyHandler) onRpcReturn(from transport.ITransport) error {
	//read trans header
	pkg := make([]byte, protocol.RespHeadSize)
	if mLen, err := from.Read(pkg[:], protocol.RespHeadSize); mLen != protocol.RespHeadSize || err != nil {
		return errors.ErrIllegalProto
	}
	// read protocol header
	msgHeader := protocol.ReadRetHeader(pkg)
	if msgHeader == nil {
		return errors.ErrIllegalReq
	}

	mLen := int(msgHeader.Length) - protocol.RespHeadSize

	retMsg := &protocol.ResponsePackage{
		Header: msgHeader,
		Buffer: make([]byte, mLen),
	}

	rLen, err := from.Read(retMsg.Buffer[:], mLen)
	if err != nil || rLen != mLen {
		return errors.ErrIllegalProto
	}
	return p.outsideReturnInside(from, retMsg.Header, retMsg.Buffer)
}

func (p *ProxyHandler) onProxyCall(from transport.ITransport) error {
	//
	pkg := make([]byte, protocol.ProxyCallHeadSize)
	if mLen, err := from.Read(pkg[:], protocol.ProxyCallHeadSize); mLen != protocol.ProxyCallHeadSize || err != nil {
		return errors.ErrIllegalProto
	}
	// read protocol header
	msgHeader := protocol.ReadProxyCallHeader(pkg)
	if msgHeader == nil {
		return errors.ErrIllegalReq
	}
	mLen := int(msgHeader.Length) - protocol.ProxyCallHeadSize
	data := make([]byte, mLen)
	rLen, err := from.Read(data[:], mLen)
	if err != nil || rLen != mLen {
		return errors.ErrIllegalProto
	}

	// 拦截http 内部客户端调用http 请求
	if p.httpHandle != nil && p.httpHandle.globalIndex == msgHeader.GlobalIndex {
		p.outsideReturnError(from, msgHeader, protocol.IDL_SERVICE_ERROR)
		p.owner.logger.Error("inside service can't call http client call id %d", msgHeader.CallID)
		return errors.ErrIllegalReq
	} else {
		return p.insideCallOutSide(from, msgHeader, data)
	}

}

func (p *ProxyHandler) onProxyReturn(from transport.ITransport) error {

	//read trans header
	pkg := make([]byte, protocol.ProxyRetHeadSize)
	if mLen, err := from.Read(pkg[:], protocol.ProxyRetHeadSize); mLen != protocol.ProxyRetHeadSize || err != nil {
		return errors.ErrIllegalProto
	}
	// read protocol header
	msgHeader := protocol.ReadProxyRetHeader(pkg)
	if msgHeader == nil {
		return errors.ErrIllegalReq
	}

	mLen := int(msgHeader.Length) - protocol.ProxyRetHeadSize

	retMsg := &protocol.ProxyRespPackage{
		Header: msgHeader,
		Buffer: make([]byte, mLen),
	}

	rLen, err := from.Read(retMsg.Buffer[:], mLen)
	if err != nil || rLen != mLen {
		return errors.ErrIllegalProto
	}

	if p.httpHandle != nil && p.httpHandle.globalIndex == retMsg.Header.GlobalIndex {
		//http 调用，转到对应接口
		return p.httpHandle.OnRpcResponse(retMsg)
	} else {
		//rpc 调用，转到对应接口
		return p.insideReturnOutside(from, retMsg.Header, retMsg.Buffer)
	}

}

// http 处理函数
func (p *ProxyHandler) onHttpCall(req *protocol.ProxyRequestPackage) (uint32, error) {
	// 产生callId
	callId := p.NewReplaceID()
	// 赋值global index
	req.Header.GlobalIndex = p.httpHandle.globalIndex
	req.Header.CallID = callId
	// 开启协程 进行服务发现
	go func() {
		to, err := p.owner.GetTransport(req.Header.ServiceUUID)
		// 没有找到对应服务，查找失败
		if err != nil || to == nil {
			task := &proxyTask{
				taskType:    transHttpCallFailed,
				serviceUuid: req.Header.ServiceUUID,
				callId:      req.Header.CallID,
			}
			p.ch <- task
			slog.Warn("[Http]  http call %d service %d failed  ! ", req.Header.CallID, req.Header.ServiceUUID)
			return
		}

		// 打包协议并发送
		buffer, _ := protocol.PackProxyReqMsg(req)
		to.Send(buffer)
	}()

	return callId, nil
}

//outsideCallInside 外部调用内部服务， 绑定模式——只有第一次连接会进行服务发现，之后的连接不会在进行服务发现; 非绑定模式，每次都会进行服务发现
func (p *ProxyHandler) outsideCallInside(from transport.ITransport, header *protocol.RpcCallHeader, data []byte) error {
	var to transport.ITransport
	//如果是绑定模式，尝试从缓存获取, 否则每次都走重新发现逻辑
	if p.cfg.Binding {
		to = p.tryGetService(from.GlobalIndex(), header.ServiceUUID)
	}
	if to != nil {
		err := p.transProxyCall(from, to, header, data)
		if err != nil {
			//极限情况下，网关刚获取到proxy，对端关闭
			resp := protocol.BuildNotFound(header)
			pkg, _ := protocol.PackRespMsg(resp)
			from.Send(pkg)
			slog.Warn("[Proxy] outside conn %v : trans call %d to service %d error !", from.GlobalIndex(), header.CallID, header.ServiceUUID)
			return err
		}
	} else {
		go func() {
			to, err := p.owner.GetTransport(header.ServiceUUID)
			if err != nil {
				resp := protocol.BuildNotFound(header)
				pkg, _ := protocol.PackRespMsg(resp)
				from.Send(pkg)
				slog.Warn("[Proxy] outside conn %v proxy find service %d error ! ", from.GlobalIndex(), header.ServiceUUID)
				return
			}

			err = p.transProxyCall(from, to, header, data)

			if p.cfg.Binding {
				//绑定模式下发送主协程进行缓存
				task := &proxyTask{
					taskType:    addInsideChan,
					globalIndex: from.GlobalIndex(),
					serviceUuid: header.ServiceUUID,
					inside:      to,
				}
				p.ch <- task
			}

			if err != nil {
				slog.Warn("[Proxy] outside conn %v send call %d to service %d error %v !", from.GlobalIndex(), header.CallID, header.ServiceUUID, err)
				return
			}
		}()
	}
	return nil
}

func (p *ProxyHandler) insideReturnOutside(from transport.ITransport, header *protocol.RpcProxyCallRetHeader, data []byte) error {
	// try get outside connection
	to, ok := p.gTransMap[header.GlobalIndex]
	if !ok {
		p.owner.logger.Warn("[Proxy] can't found outside connection by %v ", header.GlobalIndex)
		return perror.ChannelNotFound
	}
	if to.IsClose() {
		// remote to handle
		delete(p.gTransMap, header.GlobalIndex)
		p.owner.logger.Warn("target channel %d:%s has benn closed while trans %d !", to.GlobalIndex(), to.RemoteAddr(), header.CallID)
		return perror.ChannelHasClosed
	}

	retMsg := &protocol.ResponsePackage{
		Header: &protocol.RpcCallRetHeader{
			RpcMsgHeader: protocol.RpcMsgHeader{
				Length: header.Length - uint32(protocol.ProxyRetHeadSize) + uint32(protocol.RespHeadSize),
				Type:   protocol.ResponseMsg,
			},
			ServerID:  header.ServerID,
			CallID:    header.CallID,
			ErrorCode: header.ErrorCode,
		},
		Buffer: data,
	}

	pkg, _ := protocol.PackRespMsg(retMsg)

	//p.owner.logger.Info("trans call %d to %d:%s !", header.CallID, to.GlobalIndex(), to.RemoteAddr())

	return to.Send(pkg)
}

func (p *ProxyHandler) insideCallOutSide(from transport.ITransport, header *protocol.RpcProxyCallHeader, data []byte) error {
	to, ok := p.gTransMap[header.GlobalIndex]
	if !ok {
		p.outsideReturnError(from, header, protocol.IDL_SERVICE_NOT_FOUND)
		p.owner.logger.Error(" service conn %d:%q call outside %d service %d method %d, but it has been closed ! ", from.GlobalIndex(), from.RemoteAddr(), header.GlobalIndex, header.ServiceUUID, header.MethodID)
		return perror.ChannelNotFound
	}

	if to.IsClose() {
		delete(p.gTransMap, header.GlobalIndex)
		p.outsideReturnError(from, header, protocol.IDL_SERVICE_NOT_FOUND)
		p.owner.logger.Error(" service conn %d:%q call outside %d service %d method %d, but it has been closed ! ", from.GlobalIndex(), from.RemoteAddr(), header.GlobalIndex, header.ServiceUUID, header.MethodID)
		return perror.ChannelHasClosed
	}

	info := &callInfo{
		replaceCallId: p.NewReplaceID(),
		originalID:    header.CallID,
		inside:        from,
	}

	msg := &protocol.RequestPackage{
		Header: &protocol.RpcCallHeader{
			RpcMsgHeader: protocol.RpcMsgHeader{
				Length: header.Length - uint32(protocol.ProxyCallHeadSize) + uint32(protocol.CallHeadSize),
				Type:   protocol.RequestMsg,
			},
			ServiceUUID: header.ServiceUUID,
			ServerID:    header.ServerID,
			CallID:      info.replaceCallId,
			MethodID:    header.MethodID,
		},
		Buffer: data,
	}

	if header.OneWay == 0 {
		// record call info
		p.gCallInfo[info.replaceCallId] = info
		// TODO  add timer ticker
	}

	// trans packs to outside
	pack, _ := protocol.PackReqMsg(msg)
	return to.Send(pack)
}

func (p *ProxyHandler) outsideReturnInside(from transport.ITransport, header *protocol.RpcCallRetHeader, data []byte) error {
	info, ok := p.gCallInfo[header.CallID]
	if !ok {
		// call info has time out
		p.owner.logger.Info("[Proxy] proxy call %d return inside has been time out !", header.CallID)
		return nil
	}

	if info.inside.IsClose() {
		return nil
	}

	msg := &protocol.ProxyRespPackage{
		Header: &protocol.RpcProxyCallRetHeader{
			RpcMsgHeader: protocol.RpcMsgHeader{
				Length: header.Length - uint32(protocol.RespHeadSize) + uint32(protocol.ProxyRetHeadSize),
				Type:   protocol.ProxyResponseMsg,
			},
			ServerID:    header.ServerID,
			CallID:      info.originalID,
			ErrorCode:   header.ErrorCode,
			GlobalIndex: from.GlobalIndex(),
		},
		Buffer: data,
	}

	pkg, _ := protocol.PackProxyRespMsg(msg)
	delete(p.gCallInfo, header.CallID)

	return info.inside.Send(pkg)
}

func (p *ProxyHandler) tryGetService(globalIndex protocol.GlobalIndexType, uuid uint64) transport.ITransport {
	uuidTransMap, ok := p.gSrvTransCache[globalIndex]
	if !ok {
		return nil
	}

	channel, ok := uuidTransMap[uuid]
	if !ok {
		return nil
	}

	if channel.IsClose() {
		delete(uuidTransMap, uuid)
		return nil
	}

	return channel
}

func (p *ProxyHandler) transProxyCall(from, to transport.ITransport, header *protocol.RpcCallHeader, data []byte) error {
	pheader := protocol.BuildProxyCallHeader(header, from.GlobalIndex())
	reqMsg := &protocol.ProxyRequestPackage{
		Header: pheader,
		Buffer: data,
	}

	pkg, _ := protocol.PackProxyReqMsg(reqMsg)
	return to.Send(pkg)
}

func (p *ProxyHandler) outsideReturnError(from transport.ITransport, header *protocol.RpcProxyCallHeader, errorCode uint32) {
	msg := &protocol.ProxyRespPackage{
		Header: &protocol.RpcProxyCallRetHeader{
			RpcMsgHeader: protocol.RpcMsgHeader{
				Length: header.Length - uint32(protocol.ProxyCallHeadSize) + uint32(protocol.ProxyRetHeadSize),
				Type:   protocol.ProxyResponseMsg,
			},
			ServerID:    header.ServerID,
			CallID:      header.CallID,
			ErrorCode:   errorCode,
			GlobalIndex: from.GlobalIndex(),
		},
	}

	pkg, _ := protocol.PackProxyRespMsg(msg)
	err := from.Send(pkg)
	if err != nil {
		slog.Warn("send return info to %s error %v", from.RemoteAddr(), err)
	}
}
